/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.handler;

import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpClient;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CachingDirectoryFactory.CloseListener;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import static org.apache.solr.handler.ReplicationHandler.*;

import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.InflaterInputStream;

/**
 * <p/>
 * Provides functionality of downloading changed index files as well as config
 * files and a timer for scheduling fetches from the master.
 * </p>
 * 
 * 
 * @since solr 1.4
 */
public class SnapPuller {
	private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class
			.getName());

	private final String masterUrl;

	private final ReplicationHandler replicationHandler;

	private final Integer pollInterval;

	private String pollIntervalStr;

	private ScheduledExecutorService executorService;

	private volatile long executorStartTime;

	private volatile long replicationStartTime;

	private final SolrCore solrCore;

	private volatile List<Map<String, Object>> filesToDownload;

	private volatile List<Map<String, Object>> confFilesToDownload;

	private volatile List<Map<String, Object>> filesDownloaded;

	private volatile List<Map<String, Object>> confFilesDownloaded;

	private volatile Map<String, Object> currentFile;

	private volatile FileFetcher fileFetcher;

	private volatile ExecutorService fsyncService;

	private volatile boolean stop = false;

	private boolean useInternal = false;

	private boolean useExternal = false;

	/**
	 * Disable the timer task for polling
	 */
	private AtomicBoolean pollDisabled = new AtomicBoolean(false);

	// HttpClient shared by all cores (used if timeout is not specified for a
	// core)
	private static HttpClient client;
	// HttpClient for this instance if connectionTimeout or readTimeout has been
	// specified
	private final HttpClient myHttpClient;

	private static synchronized HttpClient createHttpClient(String connTimeout,
			String readTimeout, String httpBasicAuthUser,
			String httpBasicAuthPassword, boolean useCompression) {
		if (connTimeout == null && readTimeout == null && client != null)
			return client;
		final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
		httpClientParams.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT,
				connTimeout != null ? connTimeout : "5000");
		httpClientParams.set(HttpClientUtil.PROP_SO_TIMEOUT,
				readTimeout != null ? readTimeout : "20000");
		httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER,
				httpBasicAuthUser);
		httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS,
				httpBasicAuthPassword);
		httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION,
				useCompression);
		// Keeping a very high number so that if you have a large number of
		// cores
		// no requests are kept waiting for an idle connection.
		httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
		httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST,
				10000);
		HttpClient httpClient = HttpClientUtil.createClient(httpClientParams);
		if (client == null && connTimeout == null && readTimeout == null)
			client = httpClient;
		return httpClient;
	}

	public SnapPuller(final NamedList initArgs,
			final ReplicationHandler handler, final SolrCore sc) {
		solrCore = sc;
		final SolrParams params = SolrParams.toSolrParams(initArgs);
		String masterUrl = (String) initArgs.get(MASTER_URL);
		if (masterUrl == null)
			throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
					"'masterUrl' is required for a slave");
		if (masterUrl.endsWith("/replication")) {
			masterUrl = masterUrl.substring(0, masterUrl.length() - 12);
			LOG.warn("'masterUrl' must be specified without the /replication suffix");
		}
		this.masterUrl = masterUrl;

		this.replicationHandler = handler;
		pollIntervalStr = (String) initArgs.get(POLL_INTERVAL);
		pollInterval = readInterval(pollIntervalStr);
		String compress = (String) initArgs.get(COMPRESSION);
		useInternal = INTERNAL.equals(compress);
		useExternal = EXTERNAL.equals(compress);
		String connTimeout = (String) initArgs
				.get(HttpClientUtil.PROP_CONNECTION_TIMEOUT);
		String readTimeout = (String) initArgs
				.get(HttpClientUtil.PROP_SO_TIMEOUT);
		String httpBasicAuthUser = (String) initArgs
				.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
		String httpBasicAuthPassword = (String) initArgs
				.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
		myHttpClient = createHttpClient(connTimeout, readTimeout,
				httpBasicAuthUser, httpBasicAuthPassword, useExternal);
		if (pollInterval != null && pollInterval > 0) {
			startExecutorService();
		} else {
			LOG.info(" No value set for 'pollInterval'. Timer Task not started.");
		}
	}

	private void startExecutorService() {
		Runnable task = new Runnable() {
			public void run() {
				if (pollDisabled.get()) {
					LOG.info("Poll disabled");
					return;
				}
				try {
					executorStartTime = System.currentTimeMillis();
					replicationHandler.doFetch(null, false);
				} catch (Exception e) {
					LOG.error("Exception in fetching index", e);
				}
			}
		};
		executorService = Executors
				.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory(
						"snapPuller"));
		long initialDelay = pollInterval
				- (System.currentTimeMillis() % pollInterval);
		executorService.scheduleAtFixedRate(task, initialDelay, pollInterval,
				TimeUnit.MILLISECONDS);
		LOG.info("Poll Scheduled at an interval of " + pollInterval + "ms");
	}

	/**
	 * Gets the latest commit version and generation from the master
	 */
	@SuppressWarnings("unchecked")
	NamedList getLatestVersion() throws IOException {
		ModifiableSolrParams params = new ModifiableSolrParams();
		params.set(COMMAND, CMD_INDEX_VERSION);
		params.set(CommonParams.WT, "javabin");
		params.set(CommonParams.QT, "/replication");
		QueryRequest req = new QueryRequest(params);
		SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); // XXX
																			// modify
																			// to
																			// use
																			// shardhandler
		try {
			return server.request(req);
		} catch (SolrServerException e) {
			throw new IOException(e);
		}
	}

	/**
	 * Fetches the list of files in a given index commit point and updates
	 * internal list of files to download.
	 */
	private void fetchFileList(long gen) throws IOException {
		ModifiableSolrParams params = new ModifiableSolrParams();
		params.set(COMMAND, CMD_GET_FILE_LIST);
		params.set(GENERATION, String.valueOf(gen));
		params.set(CommonParams.WT, "javabin");
		params.set(CommonParams.QT, "/replication");
		QueryRequest req = new QueryRequest(params);
		SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); // XXX
																			// modify
																			// to
																			// use
																			// shardhandler

		try {
			NamedList response = server.request(req);

			List<Map<String, Object>> files = (List<Map<String, Object>>) response
					.get(CMD_GET_FILE_LIST);
			if (files != null)
				filesToDownload = Collections.synchronizedList(files);
			else {
				filesToDownload = Collections.emptyList();
				LOG.error("No files to download for index generation: " + gen);
			}

			files = (List<Map<String, Object>>) response.get(CONF_FILES);
			if (files != null)
				confFilesToDownload = Collections.synchronizedList(files);

		} catch (SolrServerException e) {
			throw new IOException(e);
		}
	}

	private boolean successfulInstall = false;

	/**
	 * This command downloads all the necessary files from master to install a
	 * index commit point. Only changed files are downloaded. It also downloads
	 * the conf files (if they are modified).
	 * 
	 * @param core
	 *            the SolrCore
	 * @param forceReplication
	 *            force a replication in all cases
	 * @return true on success, false if slave is already in sync
	 * @throws IOException
	 *             if an exception occurs
	 */
	boolean fetchLatestIndex(SolrCore core, boolean forceReplication)
			throws IOException, InterruptedException {
		successfulInstall = false;
		replicationStartTime = System.currentTimeMillis();
		try {
			// get the current 'replicateable' index version in the master
			NamedList response = null;
			try {
				response = getLatestVersion();
			} catch (Exception e) {
				LOG.error("Master at: " + masterUrl
						+ " is not available. Index fetch failed. Exception: "
						+ e.getMessage());
				return false;
			}
			long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
			long latestGeneration = (Long) response.get(GENERATION);

			IndexCommit commit;
			RefCounted<SolrIndexSearcher> searcherRefCounted = null;
			try {
				searcherRefCounted = core.getNewestSearcher(false);
				if (searcherRefCounted == null) {
					SolrException.log(LOG,
							"No open searcher found - fetch aborted");
					return false;
				}
				commit = searcherRefCounted.get().getIndexReader()
						.getIndexCommit();
			} finally {
				if (searcherRefCounted != null)
					searcherRefCounted.decref();
			}

			if (latestVersion == 0L) {
				if (forceReplication && commit.getGeneration() != 0) {
					// since we won't get the files for an empty index,
					// we just clear ours and commit
					RefCounted<IndexWriter> iw = core.getUpdateHandler()
							.getSolrCoreState().getIndexWriter(core);
					try {
						iw.get().deleteAll();
					} finally {
						iw.decref();
					}
					SolrQueryRequest req = new LocalSolrQueryRequest(core,
							new ModifiableSolrParams());
					core.getUpdateHandler().commit(
							new CommitUpdateCommand(req, false));
				}

				// there is nothing to be replicated
				successfulInstall = true;
				return true;
			}

			if (!forceReplication
					&& IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
				// master and slave are already in sync just return
				LOG.info("Slave in sync with master.");
				successfulInstall = true;
				return true;
			}
			LOG.info("Master's generation: " + latestGeneration);
			LOG.info("Slave's generation: " + commit.getGeneration());
			LOG.info("Starting replication process");
			// get the list of files first
			fetchFileList(latestGeneration);
			// this can happen if the commit point is deleted before we fetch
			// the file list.
			if (filesToDownload.isEmpty())
				return false;
			LOG.info("Number of files in latest index in master: "
					+ filesToDownload.size());

			// Create the sync service
			fsyncService = Executors
					.newSingleThreadExecutor(new DefaultSolrThreadFactory(
							"fsyncService"));
			// use a synchronized list because the list is read by other threads
			// (to show details)
			filesDownloaded = Collections
					.synchronizedList(new ArrayList<Map<String, Object>>());
			// if the generateion of master is older than that of the slave , it
			// means they are not compatible to be copied
			// then a new index direcory to be created and all the files need to
			// be copied
			boolean isFullCopyNeeded = IndexDeletionPolicyWrapper
					.getCommitTimestamp(commit) >= latestVersion
					|| forceReplication;
			File tmpIndexDir = createTempindexDir(core);
			if (isIndexStale()) {
				isFullCopyNeeded = true;
			}
			LOG.info("Starting download to " + tmpIndexDir + " fullCopy="
					+ isFullCopyNeeded);
			successfulInstall = false;
			boolean deleteTmpIdxDir = true;

			// make sure it's the newest known index dir...
			final File indexDir = new File(core.getNewIndexDir());
			Directory oldDirectory = null;
			try {
				downloadIndexFiles(isFullCopyNeeded, tmpIndexDir,
						latestGeneration);
				LOG.info("Total time taken for download : "
						+ ((System.currentTimeMillis() - replicationStartTime) / 1000)
						+ " secs");
				Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
				if (!modifiedConfFiles.isEmpty()) {
					downloadConfFiles(confFilesToDownload, latestGeneration);
					if (isFullCopyNeeded) {
						successfulInstall = modifyIndexProps(tmpIndexDir
								.getName());
						deleteTmpIdxDir = false;
					} else {
						successfulInstall = copyIndexFiles(tmpIndexDir,
								indexDir);
					}
					if (successfulInstall) {
						LOG.info("Configuration files are modified, core will be reloaded");
						logReplicationTimeAndConfFiles(modifiedConfFiles,
								successfulInstall);// write to a file time of
													// replication and conf
													// files.
						reloadCore();
					}
				} else {
					terminateAndWaitFsyncService();
					if (isFullCopyNeeded) {
						successfulInstall = modifyIndexProps(tmpIndexDir
								.getName());
						deleteTmpIdxDir = false;
						RefCounted<IndexWriter> iw = core.getUpdateHandler()
								.getSolrCoreState().getIndexWriter(core);
						try {
							oldDirectory = iw.get().getDirectory();
						} finally {
							iw.decref();
						}
					} else {
						successfulInstall = copyIndexFiles(tmpIndexDir,
								indexDir);
					}
					if (successfulInstall) {
						logReplicationTimeAndConfFiles(modifiedConfFiles,
								successfulInstall);
					}
				}

				if (isFullCopyNeeded) {
					// we have to do this before commit
					core.getDirectoryFactory().addCloseListener(oldDirectory,
							new CloseListener() {

								@Override
								public void onClose() {
									LOG.info("removing old index directory "
											+ indexDir);
									delTree(indexDir);
								}

							});
				}

				if (successfulInstall) {
					if (isFullCopyNeeded) {
						// let the system know we are changing dir's and the old
						// one
						// may be closed
						core.getDirectoryFactory().doneWithDirectory(
								oldDirectory);
					}
					doCommit(isFullCopyNeeded);
				}

				replicationStartTime = 0;
				return successfulInstall;
			} catch (ReplicationHandlerException e) {
				LOG.error("User aborted Replication");
				return false;
			} catch (SolrException e) {
				throw e;
			} catch (InterruptedException e) {
				throw new InterruptedException("Index fetch interrupted");
			} catch (Exception e) {
				throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
						"Index fetch failed : ", e);
			} finally {
				if (deleteTmpIdxDir) {
					LOG.info("removing temporary index download directory "
							+ tmpIndexDir);
					delTree(tmpIndexDir);
				}
			}
		} finally {
			if (!successfulInstall) {
				logReplicationTimeAndConfFiles(null, successfulInstall);
			}
			filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
			replicationStartTime = 0;
			fileFetcher = null;
			if (fsyncService != null && !fsyncService.isShutdown())
				fsyncService.shutdownNow();
			fsyncService = null;
			stop = false;
			fsyncException = null;
		}
	}

	private volatile Exception fsyncException;

	/**
	 * terminate the fsync service and wait for all the tasks to complete. If it
	 * is already terminated
	 */
	private void terminateAndWaitFsyncService() throws Exception {
		if (fsyncService.isTerminated())
			return;
		fsyncService.shutdown();
		// give a long wait say 1 hr
		fsyncService.awaitTermination(3600, TimeUnit.SECONDS);
		// if any fsync failed, throw that exception back
		Exception fsyncExceptionCopy = fsyncException;
		if (fsyncExceptionCopy != null)
			throw fsyncExceptionCopy;
	}

	/**
	 * Helper method to record the last replication's details so that we can
	 * show them on the statistics page across restarts.
	 */
	private void logReplicationTimeAndConfFiles(
			Collection<Map<String, Object>> modifiedConfFiles,
			boolean successfulInstall) {
		FileOutputStream outFile = null;
		List<String> confFiles = new ArrayList<String>();
		if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty())
			for (Map<String, Object> map1 : modifiedConfFiles)
				confFiles.add((String) map1.get(NAME));

		Properties props = replicationHandler.loadReplicationProperties();
		long replicationTime = System.currentTimeMillis();
		long replicationTimeTaken = (replicationTime - getReplicationStartTime()) / 1000;
		try {
			int indexCount = 1, confFilesCount = 1;
			if (props.containsKey(TIMES_INDEX_REPLICATED)) {
				indexCount = Integer.valueOf(props
						.getProperty(TIMES_INDEX_REPLICATED)) + 1;
			}
			StringBuffer sb = readToStringBuffer(replicationTime,
					props.getProperty(INDEX_REPLICATED_AT_LIST));
			props.setProperty(INDEX_REPLICATED_AT_LIST, sb.toString());
			props.setProperty(INDEX_REPLICATED_AT,
					String.valueOf(replicationTime));
			props.setProperty(PREVIOUS_CYCLE_TIME_TAKEN,
					String.valueOf(replicationTimeTaken));
			props.setProperty(TIMES_INDEX_REPLICATED,
					String.valueOf(indexCount));
			if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
				props.setProperty(CONF_FILES_REPLICATED, confFiles.toString());
				props.setProperty(CONF_FILES_REPLICATED_AT,
						String.valueOf(replicationTime));
				if (props.containsKey(TIMES_CONFIG_REPLICATED)) {
					confFilesCount = Integer.valueOf(props
							.getProperty(TIMES_CONFIG_REPLICATED)) + 1;
				}
				props.setProperty(TIMES_CONFIG_REPLICATED,
						String.valueOf(confFilesCount));
			}

			props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED,
					String.valueOf(getTotalBytesDownloaded(this)));
			if (!successfulInstall) {
				int numFailures = 1;
				if (props.containsKey(TIMES_FAILED)) {
					numFailures = Integer.valueOf(props
							.getProperty(TIMES_FAILED)) + 1;
				}
				props.setProperty(TIMES_FAILED, String.valueOf(numFailures));
				props.setProperty(REPLICATION_FAILED_AT,
						String.valueOf(replicationTime));
				sb = readToStringBuffer(replicationTime,
						props.getProperty(REPLICATION_FAILED_AT_LIST));
				props.setProperty(REPLICATION_FAILED_AT_LIST, sb.toString());
			}
			File f = new File(solrCore.getDataDir(), REPLICATION_PROPERTIES);
			outFile = new FileOutputStream(f);
			props.store(outFile, "Replication details");
			outFile.close();
		} catch (Exception e) {
			LOG.warn("Exception while updating statistics", e);
		} finally {
			IOUtils.closeQuietly(outFile);
		}
	}

	static long getTotalBytesDownloaded(SnapPuller snappuller) {
		long bytesDownloaded = 0;
		// get size from list of files to download
		for (Map<String, Object> file : snappuller.getFilesDownloaded()) {
			bytesDownloaded += (Long) file.get(SIZE);
		}

		// get size from list of conf files to download
		for (Map<String, Object> file : snappuller.getConfFilesDownloaded()) {
			bytesDownloaded += (Long) file.get(SIZE);
		}

		// get size from current file being downloaded
		Map<String, Object> currentFile = snappuller.getCurrentFile();
		if (currentFile != null) {
			if (currentFile.containsKey("bytesDownloaded")) {
				bytesDownloaded += (Long) currentFile.get("bytesDownloaded");
			}
		}
		return bytesDownloaded;
	}

	private StringBuffer readToStringBuffer(long replicationTime, String str) {
		StringBuffer sb = new StringBuffer();
		List<String> l = new ArrayList<String>();
		if (str != null && str.length() != 0) {
			String[] ss = str.split(",");
			for (int i = 0; i < ss.length; i++) {
				l.add(ss[i]);
			}
		}
		sb.append(replicationTime);
		if (!l.isEmpty()) {
			for (int i = 0; i < l.size() || i < 9; i++) {
				if (i == l.size() || i == 9)
					break;
				String s = l.get(i);
				sb.append(",").append(s);
			}
		}
		return sb;
	}

	private void doCommit(boolean isFullCopyNeeded) throws IOException {
		SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
				new ModifiableSolrParams());
		// reboot the writer on the new index and get a new searcher
		solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);

		try {
			// first try to open an NRT searcher so that the new
			// IndexWriter is registered with the reader
			Future[] waitSearcher = new Future[1];
			solrCore.getSearcher(true, false, waitSearcher, true);
			if (waitSearcher[0] != null) {
				try {
					waitSearcher[0].get();
				} catch (InterruptedException e) {
					SolrException.log(LOG, e);
				} catch (ExecutionException e) {
					SolrException.log(LOG, e);
				}
			}

			// update our commit point to the right dir
			CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
			cuc.waitSearcher = false;
			cuc.openSearcher = false;
			solrCore.getUpdateHandler().commit(cuc);

		} finally {
			req.close();
		}
	}

	/**
	 * All the files are copied to a temp dir first
	 */
	private File createTempindexDir(SolrCore core) {
		String tmpIdxDirName = "index."
				+ new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT)
						.format(new Date());
		File tmpIdxDir = new File(core.getDataDir(), tmpIdxDirName);
		tmpIdxDir.mkdirs();
		return tmpIdxDir;
	}

	private void reloadCore() {
		new Thread() {
			@Override
			public void run() {
				try {
					solrCore.getCoreDescriptor().getCoreContainer()
							.reload(solrCore.getName());
				} catch (Exception e) {
					LOG.error("Could not restart core ", e);
				}
			}
		}.start();
	}

	private void downloadConfFiles(
			List<Map<String, Object>> confFilesToDownload, long latestGeneration)
			throws Exception {
		LOG.info("Starting download of configuration files from master: "
				+ confFilesToDownload);
		confFilesDownloaded = Collections
				.synchronizedList(new ArrayList<Map<String, Object>>());
		File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(),
				"conf." + getDateAsStr(new Date()));
		try {
			boolean status = tmpconfDir.mkdirs();
			if (!status) {
				throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
						"Failed to create temporary config folder: "
								+ tmpconfDir.getName());
			}
			for (Map<String, Object> file : confFilesToDownload) {
				String saveAs = (String) (file.get(ALIAS) == null ? file
						.get(NAME) : file.get(ALIAS));
				fileFetcher = new FileFetcher(tmpconfDir, file, saveAs, true,
						latestGeneration);
				currentFile = file;
				fileFetcher.fetchFile();
				confFilesDownloaded.add(new HashMap<String, Object>(file));
			}
			// this is called before copying the files to the original conf dir
			// so that if there is an exception avoid corrupting the original
			// files.
			terminateAndWaitFsyncService();
			copyTmpConfFiles2Conf(tmpconfDir);
		} finally {
			delTree(tmpconfDir);
		}
	}

	/**
	 * Download the index files. If a new index is needed, download all the
	 * files.
	 * 
	 * @param downloadCompleteIndex
	 *            is it a fresh index copy
	 * @param tmpIdxDir
	 *            the directory to which files need to be downloadeed to
	 * @param latestGeneration
	 *            the version number
	 */
	private void downloadIndexFiles(boolean downloadCompleteIndex,
			File tmpIdxDir, long latestGeneration) throws Exception {
		String indexDir = solrCore.getIndexDir();
		for (Map<String, Object> file : filesToDownload) {
			File localIndexFile = new File(indexDir, (String) file.get(NAME));
			if (!localIndexFile.exists() || downloadCompleteIndex) {
				fileFetcher = new FileFetcher(tmpIdxDir, file,
						(String) file.get(NAME), false, latestGeneration);
				currentFile = file;
				fileFetcher.fetchFile();
				filesDownloaded.add(new HashMap<String, Object>(file));
			} else {
				LOG.info("Skipping download for " + localIndexFile);
			}
		}
	}

	/**
	 * All the files which are common between master and slave must have same
	 * size else we assume they are not compatible (stale).
	 * 
	 * @return true if the index stale and we need to download a fresh copy,
	 *         false otherwise.
	 */
	private boolean isIndexStale() {
		for (Map<String, Object> file : filesToDownload) {
			File localIndexFile = new File(solrCore.getIndexDir(),
					(String) file.get(NAME));
			if (localIndexFile.exists()
					&& localIndexFile.length() != (Long) file.get(SIZE)) {
				// file exists and size is different, therefore we must assume
				// corrupted index
				return true;
			}
		}
		return false;
	}

	/**
	 * Copy a file by the File#renameTo() method. If it fails, it is considered
	 * a failure
	 * <p/>
	 */
	private boolean copyAFile(File tmpIdxDir, File indexDir, String fname,
			List<String> copiedfiles) {
		File indexFileInTmpDir = new File(tmpIdxDir, fname);
		File indexFileInIndex = new File(indexDir, fname);
		boolean success = indexFileInTmpDir.renameTo(indexFileInIndex);
		if (!success) {
			try {
				LOG.error("Unable to move index file from: "
						+ indexFileInTmpDir + " to: " + indexFileInIndex
						+ " Trying to do a copy");
				FileUtils.copyFile(indexFileInTmpDir, indexFileInIndex);
				success = true;
			} catch (IOException e) {
				LOG.error("Unable to copy index file from: "
						+ indexFileInTmpDir + " to: " + indexFileInIndex, e);
			}
		}

		if (!success) {
			for (String f : copiedfiles) {
				File indexFile = new File(indexDir, f);
				if (indexFile.exists())
					indexFile.delete();
			}
			delTree(tmpIdxDir);
			return false;
		}
		return true;
	}

	/**
	 * Copy all index files from the temp index dir to the actual index. The
	 * segments_N file is copied last.
	 */
	private boolean copyIndexFiles(File tmpIdxDir, File indexDir) {
		String segmentsFile = null;
		List<String> copiedfiles = new ArrayList<String>();
		for (Map<String, Object> f : filesDownloaded) {
			String fname = (String) f.get(NAME);
			// the segments file must be copied last
			// or else if there is a failure in between the
			// index will be corrupted
			if (fname.startsWith("segments_")) {
				// The segments file must be copied in the end
				// Otherwise , if the copy fails index ends up corrupted
				segmentsFile = fname;
				continue;
			}
			if (!copyAFile(tmpIdxDir, indexDir, fname, copiedfiles))
				return false;
			copiedfiles.add(fname);
		}
		// copy the segments file last
		if (segmentsFile != null) {
			if (!copyAFile(tmpIdxDir, indexDir, segmentsFile, copiedfiles))
				return false;
		}
		return true;
	}

	/**
	 * The conf files are copied to the tmp dir to the conf dir. A backup of the
	 * old file is maintained
	 */
	private void copyTmpConfFiles2Conf(File tmpconfDir) {
		File confDir = new File(solrCore.getResourceLoader().getConfigDir());
		for (File file : tmpconfDir.listFiles()) {
			File oldFile = new File(confDir, file.getName());
			if (oldFile.exists()) {
				File backupFile = new File(confDir, oldFile.getName() + "."
						+ getDateAsStr(new Date(oldFile.lastModified())));
				boolean status = oldFile.renameTo(backupFile);
				if (!status) {
					throw new SolrException(
							SolrException.ErrorCode.SERVER_ERROR,
							"Unable to rename: " + oldFile + " to: "
									+ backupFile);
				}
			}
			boolean status = file.renameTo(oldFile);
			if (status) {
			} else {
				throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
						"Unable to rename: " + file + " to: " + oldFile);
			}
		}
	}

	private String getDateAsStr(Date d) {
		return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT)
				.format(d);
	}

	/**
	 * If the index is stale by any chance, load index from a different dir in
	 * the data dir.
	 */
	private boolean modifyIndexProps(String tmpIdxDirName) {
		LOG.info("New index installed. Updating index properties... index="
				+ tmpIdxDirName);
		File idxprops = new File(solrCore.getDataDir() + "index.properties");
		Properties p = new Properties();
		if (idxprops.exists()) {
			InputStream is = null;
			try {
				is = new FileInputStream(idxprops);
				p.load(is);
			} catch (Exception e) {
				LOG.error("Unable to load index.properties");
			} finally {
				IOUtils.closeQuietly(is);
			}
		}
		p.put("index", tmpIdxDirName);
		FileOutputStream os = null;
		try {
			os = new FileOutputStream(idxprops);
			p.store(os, "index properties");
		} catch (Exception e) {
			throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
					"Unable to write index.properties", e);
		} finally {
			IOUtils.closeQuietly(os);
		}
		return true;
	}

	private final Map<String, FileInfo> confFileInfoCache = new HashMap<String, FileInfo>();

	/**
	 * The local conf files are compared with the conf files in the master. If
	 * they are same (by checksum) do not copy.
	 * 
	 * @param confFilesToDownload
	 *            The list of files obtained from master
	 * 
	 * @return a list of configuration files which have changed on the master
	 *         and need to be downloaded.
	 */
	private Collection<Map<String, Object>> getModifiedConfFiles(
			List<Map<String, Object>> confFilesToDownload) {
		if (confFilesToDownload == null || confFilesToDownload.isEmpty())
			return Collections.EMPTY_LIST;
		// build a map with alias/name as the key
		Map<String, Map<String, Object>> nameVsFile = new HashMap<String, Map<String, Object>>();
		NamedList names = new NamedList();
		for (Map<String, Object> map : confFilesToDownload) {
			// if alias is present that is the name the file may have in the
			// slave
			String name = (String) (map.get(ALIAS) == null ? map.get(NAME)
					: map.get(ALIAS));
			nameVsFile.put(name, map);
			names.add(name, null);
		}
		// get the details of the local conf files with the same alias/name
		List<Map<String, Object>> localFilesInfo = replicationHandler
				.getConfFileInfoFromCache(names, confFileInfoCache);
		// compare their size/checksum to see if
		for (Map<String, Object> fileInfo : localFilesInfo) {
			String name = (String) fileInfo.get(NAME);
			Map<String, Object> m = nameVsFile.get(name);
			if (m == null)
				continue; // the file is not even present locally (so must be
							// downloaded)
			if (m.get(CHECKSUM).equals(fileInfo.get(CHECKSUM))) {
				nameVsFile.remove(name); // checksums are same so the file need
											// not be downloaded
			}
		}
		return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile
				.values();
	}

	/**
	 * Delete the directory tree recursively
	 */
	static boolean delTree(File dir) {
		if (dir == null || !dir.exists())
			return false;
		boolean isSuccess = true;
		File contents[] = dir.listFiles();
		if (contents != null) {
			for (File file : contents) {
				if (file.isDirectory()) {
					boolean success = delTree(file);
					if (!success) {
						LOG.warn("Unable to delete directory : " + file);
						isSuccess = false;
					}
				} else {
					boolean success = file.delete();
					if (!success) {
						LOG.warn("Unable to delete file : " + file);
						isSuccess = false;
						return false;
					}
				}
			}
		}
		return isSuccess && dir.delete();
	}

	/**
	 * Disable periodic polling
	 */
	void disablePoll() {
		pollDisabled.set(true);
		LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled);
	}

	/**
	 * Enable periodic polling
	 */
	void enablePoll() {
		pollDisabled.set(false);
		LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
	}

	/**
	 * Stops the ongoing pull
	 */
	void abortPull() {
		stop = true;
	}

	long getReplicationStartTime() {
		return replicationStartTime;
	}

	List<Map<String, Object>> getConfFilesToDownload() {
		// make a copy first because it can be null later
		List<Map<String, Object>> tmp = confFilesToDownload;
		// create a new instance. or else iterator may fail
		return tmp == null ? Collections.EMPTY_LIST
				: new ArrayList<Map<String, Object>>(tmp);
	}

	List<Map<String, Object>> getConfFilesDownloaded() {
		// make a copy first because it can be null later
		List<Map<String, Object>> tmp = confFilesDownloaded;
		// NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
		return tmp == null ? Collections.EMPTY_LIST
				: new ArrayList<Map<String, Object>>(tmp);
	}

	List<Map<String, Object>> getFilesToDownload() {
		// make a copy first because it can be null later
		List<Map<String, Object>> tmp = filesToDownload;
		return tmp == null ? Collections.EMPTY_LIST
				: new ArrayList<Map<String, Object>>(tmp);
	}

	List<Map<String, Object>> getFilesDownloaded() {
		List<Map<String, Object>> tmp = filesDownloaded;
		return tmp == null ? Collections.EMPTY_LIST
				: new ArrayList<Map<String, Object>>(tmp);
	}

	Map<String, Object> getCurrentFile() {
		Map<String, Object> tmp = currentFile;
		FileFetcher tmpFileFetcher = fileFetcher;
		if (tmp == null)
			return null;
		tmp = new HashMap<String, Object>(tmp);
		if (tmpFileFetcher != null)
			tmp.put("bytesDownloaded", tmpFileFetcher.bytesDownloaded);
		return tmp;
	}

	boolean isPollingDisabled() {
		return pollDisabled.get();
	}

	Long getNextScheduledExecTime() {
		Long nextTime = null;
		if (executorStartTime > 0)
			nextTime = executorStartTime + pollInterval;
		return nextTime;
	}

	private static class ReplicationHandlerException extends
			InterruptedException {
		public ReplicationHandlerException(String message) {
			super(message);
		}
	}

	/**
	 * The class acts as a client for ReplicationHandler.FileStream. It
	 * understands the protocol of wt=filestream
	 * 
	 * @see org.apache.solr.handler.ReplicationHandler.FileStream
	 */
	private class FileFetcher {
		boolean includeChecksum = true;

		private File copy2Dir;

		String fileName;

		String saveAs;

		long size, lastmodified;

		long bytesDownloaded = 0;

		FileChannel fileChannel;

		private FileOutputStream fileOutputStream;

		byte[] buf = new byte[1024 * 1024];

		Checksum checksum;

		File file;

		int errorCount = 0;

		private boolean isConf;

		private boolean aborted = false;

		private Long indexGen;

		FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
				boolean isConf, long latestGen) throws IOException {
			this.copy2Dir = dir;
			this.fileName = (String) fileDetails.get(NAME);
			this.size = (Long) fileDetails.get(SIZE);
			this.isConf = isConf;
			this.saveAs = saveAs;
			if (fileDetails.get(LAST_MODIFIED) != null) {
				lastmodified = (Long) fileDetails.get(LAST_MODIFIED);
			}
			indexGen = latestGen;

			this.file = new File(copy2Dir, saveAs);

			File parentDir = this.file.getParentFile();
			if (!parentDir.exists()) {
				if (!parentDir.mkdirs()) {
					throw new SolrException(
							SolrException.ErrorCode.SERVER_ERROR,
							"Failed to create (sub)directory for file: "
									+ saveAs);
				}
			}

			this.fileOutputStream = new FileOutputStream(file);
			this.fileChannel = this.fileOutputStream.getChannel();

			if (includeChecksum)
				checksum = new Adler32();
		}

		/**
		 * The main method which downloads file
		 */
		void fetchFile() throws Exception {
			try {
				while (true) {
					final FastInputStream is = getStream();
					int result;
					try {
						// fetch packets one by one in a single request
						result = fetchPackets(is);
						if (result == 0 || result == NO_CONTENT) {
							// if the file is downloaded properly set the
							// timestamp same as that in the server
							if (file.exists() && lastmodified > 0)
								file.setLastModified(lastmodified);
							return;
						}
						// if there is an error continue. But continue from the
						// point where it got broken
					} finally {
						IOUtils.closeQuietly(is);
					}
				}
			} finally {
				cleanup();
				// if cleanup suceeds . The file is downloaded fully. do an
				// fsync
				fsyncService.submit(new Runnable() {
					public void run() {
						try {
							FileUtils.sync(file);
						} catch (IOException e) {
							fsyncException = e;
						}
					}
				});
			}
		}

		private int fetchPackets(FastInputStream fis) throws Exception {
			byte[] intbytes = new byte[4];
			byte[] longbytes = new byte[8];
			try {
				while (true) {
					if (stop) {
						stop = false;
						aborted = true;
						throw new ReplicationHandlerException(
								"User aborted replication");
					}
					long checkSumServer = -1;
					fis.readFully(intbytes);
					// read the size of the packet
					int packetSize = readInt(intbytes);
					if (packetSize <= 0) {
						LOG.warn("No content recieved for file: " + currentFile);
						return NO_CONTENT;
					}
					if (buf.length < packetSize)
						buf = new byte[packetSize];
					if (checksum != null) {
						// read the checksum
						fis.readFully(longbytes);
						checkSumServer = readLong(longbytes);
					}
					// then read the packet of bytes
					fis.readFully(buf, 0, packetSize);
					// compare the checksum as sent from the master
					if (includeChecksum) {
						checksum.reset();
						checksum.update(buf, 0, packetSize);
						long checkSumClient = checksum.getValue();
						if (checkSumClient != checkSumServer) {
							LOG.error("Checksum not matched between client and server for: "
									+ currentFile);
							// if checksum is wrong it is a problem return for
							// retry
							return 1;
						}
					}
					// if everything is fine, write down the packet to the file
					fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
					bytesDownloaded += packetSize;
					if (bytesDownloaded >= size)
						return 0;
					// errorcount is always set to zero after a successful
					// packet
					errorCount = 0;
				}
			} catch (ReplicationHandlerException e) {
				throw e;
			} catch (Exception e) {
				LOG.warn("Error in fetching packets ", e);
				// for any failure , increment the error count
				errorCount++;
				// if it fails for the same pacaket for MAX_RETRIES fail and
				// come out
				if (errorCount > MAX_RETRIES) {
					throw new SolrException(
							SolrException.ErrorCode.SERVER_ERROR,
							"Fetch failed for file:" + fileName, e);
				}
				return ERR;
			}
		}

		/**
		 * The webcontainer flushes the data only after it fills the buffer
		 * size. So, all data has to be read as readFully() other wise it fails.
		 * So read everything as bytes and then extract an integer out of it
		 */
		private int readInt(byte[] b) {
			return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
					| ((b[2] & 0xff) << 8) | (b[3] & 0xff));

		}

		/**
		 * Same as above but to read longs from a byte array
		 */
		private long readLong(byte[] b) {
			return (((long) (b[0] & 0xff)) << 56)
					| (((long) (b[1] & 0xff)) << 48)
					| (((long) (b[2] & 0xff)) << 40)
					| (((long) (b[3] & 0xff)) << 32)
					| (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
					| ((b[6] & 0xff) << 8) | ((b[7] & 0xff));

		}

		/**
		 * cleanup everything
		 */
		private void cleanup() {
			try {
				// close the FileOutputStream (which also closes the Channel)
				fileOutputStream.close();
			} catch (Exception e) {/* noop */
				LOG.error("Error closing the file stream: " + this.saveAs, e);
			}
			if (bytesDownloaded != size) {
				// if the download is not complete then
				// delete the file being downloaded
				try {
					file.delete();
				} catch (Exception e) {
					LOG.error("Error deleting file in cleanup" + e.getMessage());
				}
				// if the failure is due to a user abort it is returned nomally
				// else an exception is thrown
				if (!aborted)
					throw new SolrException(
							SolrException.ErrorCode.SERVER_ERROR,
							"Unable to download " + fileName
									+ " completely. Downloaded "
									+ bytesDownloaded + "!=" + size);
			}
		}

		/**
		 * Open a new stream using HttpClient
		 */
		FastInputStream getStream() throws IOException {
			SolrServer s = new HttpSolrServer(masterUrl, myHttpClient, null); // XXX
																				// use
																				// shardhandler
			ModifiableSolrParams params = new ModifiableSolrParams();

			// //the method is command=filecontent
			params.set(COMMAND, CMD_GET_FILE);
			params.set(GENERATION, Long.toString(indexGen));
			params.set(CommonParams.QT, "/replication");
			// add the version to download. This is used to reserve the download
			if (isConf) {
				// set cf instead of file for config file
				params.set(CONF_FILE_SHORT, fileName);
			} else {
				params.set(FILE, fileName);
			}
			if (useInternal) {
				params.set(COMPRESSION, "true");
			}
			// use checksum
			if (this.includeChecksum) {
				params.set(CHECKSUM, true);
			}
			// wt=filestream this is a custom protocol
			params.set(CommonParams.WT, FILE_STREAM);
			// This happen if there is a failure there is a retry. the
			// offset=<sizedownloaded> ensures that
			// the server starts from the offset
			if (bytesDownloaded > 0) {
				params.set(OFFSET, Long.toString(bytesDownloaded));
			}

			NamedList response;
			InputStream is = null;
			try {
				QueryRequest req = new QueryRequest(params);
				response = s.request(req);
				is = (InputStream) response.get("stream");
				if (useInternal) {
					is = new InflaterInputStream(is);
				}
				return new FastInputStream(is);
			} catch (Throwable t) {
				// close stream on error
				IOUtils.closeQuietly(is);
				throw new IOException("Could not download file '" + fileName
						+ "'", t);
			}
		}
	}

	NamedList getDetails() throws IOException, SolrServerException {
		ModifiableSolrParams params = new ModifiableSolrParams();
		params.set(COMMAND, CMD_DETAILS);
		params.set("slave", false);
		params.set(CommonParams.QT, "/replication");
		SolrServer server = new HttpSolrServer(masterUrl, myHttpClient); // XXX
																			// use
																			// shardhandler
		QueryRequest request = new QueryRequest(params);
		return server.request(request);
	}

	static Integer readInterval(String interval) {
		if (interval == null)
			return null;
		int result = 0;
		if (interval != null) {
			Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
			if (m.find()) {
				String hr = m.group(1);
				String min = m.group(2);
				String sec = m.group(3);
				result = 0;
				try {
					if (sec != null && sec.length() > 0)
						result += Integer.parseInt(sec);
					if (min != null && min.length() > 0)
						result += (60 * Integer.parseInt(min));
					if (hr != null && hr.length() > 0)
						result += (60 * 60 * Integer.parseInt(hr));
					result *= 1000;
				} catch (NumberFormatException e) {
					throw new SolrException(
							SolrException.ErrorCode.SERVER_ERROR,
							INTERVAL_ERR_MSG);
				}
			} else {
				throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
						INTERVAL_ERR_MSG);
			}

		}
		return result;
	}

	public void destroy() {
		if (executorService != null)
			executorService.shutdown();
	}

	String getMasterUrl() {
		return masterUrl;
	}

	String getPollInterval() {
		return pollIntervalStr;
	}

	private static final int MAX_RETRIES = 5;

	private static final int NO_CONTENT = 1;

	private static final int ERR = 2;

	public static final String REPLICATION_PROPERTIES = "replication.properties";

	public static final String POLL_INTERVAL = "pollInterval";

	public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL
			+ " must be in this format 'HH:mm:ss'";

	private static final Pattern INTERVAL_PATTERN = Pattern
			.compile("(\\d*?):(\\d*?):(\\d*)");

	static final String INDEX_REPLICATED_AT = "indexReplicatedAt";

	static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated";

	static final String CONF_FILES_REPLICATED = "confFilesReplicated";

	static final String CONF_FILES_REPLICATED_AT = "confFilesReplicatedAt";

	static final String TIMES_CONFIG_REPLICATED = "timesConfigReplicated";

	static final String LAST_CYCLE_BYTES_DOWNLOADED = "lastCycleBytesDownloaded";

	static final String TIMES_FAILED = "timesFailed";

	static final String REPLICATION_FAILED_AT = "replicationFailedAt";

	static final String PREVIOUS_CYCLE_TIME_TAKEN = "previousCycleTimeInSeconds";

	static final String INDEX_REPLICATED_AT_LIST = "indexReplicatedAtList";

	static final String REPLICATION_FAILED_AT_LIST = "replicationFailedAtList";
}
